package com.sikong.geomesa;

import org.apache.hadoop.util.StringUtils;
import org.geotools.data.DataStore;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.util.factory.Hints;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * 这是一段用于从csv文件中向geomesa-hbase批量插入数据的代码
 *
 * @author 梁超
 * @since 0.1.0
 * @Date 2022/03/27
 */
public class BatchUpload {

    /**
     * 1、输入csv的文件路径
     * 2、定义时间解析格式
     */
    private final String CSV = "src/test/resources/weiboData/GuangZhou.csv";
    private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 此方法用于读取csv文件并
     * @return BufferedReader
     */
    private BufferedReader readCSV() throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(this.CSV), StandardCharsets.UTF_8));
        System.out.println(bufferedReader.readLine());

        return bufferedReader;
    }

    /**
     * 此方法用于遍历bufferedReader中每一行，并基于Geotools插入至geomesa-hbase
     * 1、连接至geomesa-hbase并返回dataStore
     * 2、构建要素类，定义表结构
     * 3、基于geotools构建要素并插入
     */
    public void batchUpload() throws IOException, ParseException {
        PutFeature demo = new PutFeature();

        DataStore dataStore = demo.mkConnection("weibo");
        SimpleFeatureType sft = demo.setTable("checkIn", "createdAt:Date,source:String,text:String,textLength:Integer,userDescription:String,userGender:String,userLocation:String,userName:String,address:String,businessArea:String,city:String,cityBlock:String,country:String,district:String,formattedAddress:String,locality:String,region:String,street:String,*geom:Point:srid=4326");
        dataStore.createSchema(sft);

        SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sft);

        String line;
        int idx = 0;
        BufferedReader bufferedReader = this.readCSV();
        while ((line = bufferedReader.readLine()) != null) {
            String[] split = line.split(StringUtils.COMMA_STR);
            builder.set("createdAt", this.sdf.parse(split[0]));
            builder.set("source", split[1]);
            builder.set("text", split[2]);
            builder.set("textLength", Integer.parseInt(split[3]));
            builder.set("userGender", split[4]);
            builder.set("userDescription", split[5]);
            builder.set("userLocation", split[6]);
            builder.set("userName", split[7]);
            builder.set("address", split[8]);
            builder.set("businessArea", split[9]);
            builder.set("city", split[10]);
            builder.set("cityBlock", split[11]);
            builder.set("country", split[12]);
            builder.set("district", split[13]);
            builder.set("formattedAddress", split[14]);
            builder.set("locality", split[15]);
            builder.set("region", split[16]);
            builder.set("street", split[17]);
            builder.set("geom", "POINT (" + split[18] + " " + split[19] + ")");

            builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
            SimpleFeature feature = builder.buildFeature(String.valueOf(idx));
            demo.writeFeature(dataStore, sft, feature);
            demo.writeClose();
            System.out.println((idx + 1 + "---- Data-ingestion Successful!"));
            idx++;
        }
    }
}
